#!/usr/bin/python3 -cimport os, sys; os.execv(os.path.dirname(sys.argv[1]) + "/../common/pywrap", sys.argv)

# This file is part of Cockpit.
#
# Copyright (C) 2013 Red Hat, Inc.
#
# Cockpit is free software; you can redistribute it and/or modify it
# under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation; either version 2.1 of the License, or
# (at your option) any later version.
#
# Cockpit is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with Cockpit; If not, see <https://www.gnu.org/licenses/>.

import base64
import re
import subprocess
import time

import testlib


@testlib.nondestructive
class TestLogin(testlib.MachineCase):
    def check_shell(self):
        b = self.browser
        b.wait_visible("#content")
        b.wait_text('#current-username', 'admin')
        if self.multihost_enabled:
            b.set_layout("mobile")
            b.click("#hosts-sel button")
            b.wait_in_text(".view-hosts .pf-m-current", "admin @")
            b.click("#hosts-sel button")
            b.set_layout("desktop")

    # @testlib.skipBeiboot("no local cockpit PAM config in beiboot mode")
    def testBasic(self):
        m = self.machine
        b = self.browser

        # Setup users and passwords
        m.execute("useradd user -c 'Barney Bär'")
        m.execute("echo user:abcdefg | chpasswd")

        admins_only_pam = """account    sufficient   pam_succeed_if.so uid = 0\\
account    required     pam_succeed_if.so user ingroup %s""" % m.get_admin_group()

        # Setup a special PAM config that disallows non-wheel users
        def deny_non_root(remote_filename):
            if m.image == "arch":
                self.sed_file(f'1 a {admins_only_pam}', remote_filename)
            else:
                self.sed_file(f'/nologin/a {admins_only_pam}', remote_filename)

        if not m.ws_container:
            deny_non_root("/etc/pam.d/cockpit")
        deny_non_root("/etc/pam.d/sshd")

        m.start_cockpit()
        b.open("/system")

        # Test banner
        # Test that we don't show banner when not specified
        b.wait_visible("#login-user-input")
        b.wait_not_visible("#banner")

        if m.ws_container:
            m.execute("podman exec ws printf '[Session]\nBanner = /host/etc/issue\n' > /etc/cockpit/cockpit.conf")
        else:
            m.execute("printf '[Session]\nBanner = /etc/issue\n' > /etc/cockpit/cockpit.conf")
        m.restart_cockpit()
        b.reload()
        b.wait_visible("#login-user-input")
        b.wait_visible("#banner")
        self.assertEqual(b.text("#banner-message"), m.execute("cat /etc/issue").rstrip())

        # Test non existent file
        m.execute("printf '[Session]\nBanner = /etc/non-existing-file\n' > /etc/cockpit/cockpit.conf")
        self.allow_journal_messages("error loading contents of banner: Failed to open file “/etc/non-existing-file”: No such file or directory")
        m.restart_cockpit()
        b.reload()
        b.wait_visible("#login-user-input")
        b.wait_not_visible("#banner")

        def cockpit_sessions():
            """Return list of users who have a running Cockpit session"""
            # unfortunately the session type isn't part of the list view/query
            # --json=short is very recent, not yet available in Fedora 40 and older; so parse lines
            users = []
            for line in m.execute("loginctl --no-legend list-sessions").strip().splitlines():
                fields = line.split()
                info = m.execute(f"loginctl show-session {fields[0]}")
                if "Type=web" in info and "State=closing" not in info:
                    users.append(fields[2])
            return users

        # Try to login as a non-existing user
        b.try_login("nonexisting", "blahblah")
        b.wait_text("#login-error-title", "Authentication failed")
        b.wait_text("#login-error-message", "Wrong user name or password")
        self.assertEqual(cockpit_sessions(), [])

        # Try to login as user with a wrong password
        b.try_login("user", "gfedcba")
        b.wait_text_not("#login-error-message", "")
        self.assertEqual(cockpit_sessions(), [])
        self.allow_journal_messages(".* user: Authentication failure.*")

        # Try to login as user with correct password
        b.try_login("user", "abcdefg")
        b.wait_text("#login-error-title", "Authentication failed" if m.ws_container else "Permission denied")
        self.assertEqual(cockpit_sessions(), [])

        # Try to login with disabled shell; this does not work on OSTree where
        # we log in through ssh
        if not m.ws_container:
            m.execute("usermod --shell /bin/false admin; sync")
            b.reload()
            b.try_login("admin", "foobar")
            b.wait_text_not("#login-error-title", "")
            self.assertIn(b.text("#login-error-title"),
                          ["Permission denied", "Authentication failed"])
            m.execute("usermod --shell /bin/bash admin; sync")

        # Login as admin
        b.open("/system")
        b.login_and_go()
        self.check_shell()

        if not m.ws_container:  # logs in via ssh, not cockpit-session
            # older coreutils shows the session type (web), newer ones the service name
            self.assertEqual(cockpit_sessions(), ["admin"])

        # reload, which should log us in with the cookie
        b.reload()
        self.check_shell()

        if not m.ws_container:  # logs in via ssh, not cockpit-session
            self.assertEqual(cockpit_sessions(), ["admin"])

        b.go("/users#/admin")
        b.enter_page("/users")
        b.wait_text("#account-user-name", "admin")
        try:
            m.execute("journalctl -p 7 SYSLOG_IDENTIFIER=cockpit-ws | grep 'cockpit-session: opening pam session'")
            self.fail("cockpit-session debug messages found")
        except subprocess.CalledProcessError:
            pass

        # Change login screen options
        b.logout()
        self.assertEqual(cockpit_sessions(), [])
        b.wait_visible("#option-group")
        m.execute("printf '[WebService]\nLoginTo = false\n' > /etc/cockpit/cockpit.conf")
        m.restart_cockpit()
        b.open("/system")
        b.wait_visible("#login")
        b.wait_not_visible("#option-group")

        # LoginTo= also disables direct URL
        b.open("/=192.168.99.99/")
        b.wait_visible("#login")
        b.wait_not_visible("#option-group")
        # logging in does not go via ssh (which would cause a connection failure)
        b.try_login()
        # this isn't the most helpful error message, but this is essentially hacking
        b.wait_text("#login-error-title", "Authentication failed")
        b.wait_text("#login-error-message", "Wrong user name or password")

        # Default options be to display these options
        m.execute("rm /etc/cockpit/cockpit.conf")
        m.restart_cockpit()
        b.open("/system")
        b.wait_visible("#option-group")

        # And now we remove ssh which affects the default
        if not m.ws_container:
            self.restore_file("/usr/bin/ssh")
            m.execute("rm /usr/bin/ssh")
            m.restart_cockpit()
            b.open("/system")
            b.wait_visible("#login")
            b.wait_not_visible("#option-group")

            # test login with tcsh
            if not m.image.startswith("rhel") and not m.image.startswith("centos") and not m.image == "arch":  # no tcsh in RHEL and in Arch Linux (TODO: available in [community])
                try:
                    m.execute("sed -r -i.bak '/^admin:/ s_:[^:]+$_:/bin/tcsh_' /etc/passwd")
                    b.login_and_go()
                    b.enter_page('/system')
                    b.wait_visible('.system-information')
                    b.logout()
                finally:
                    m.execute("mv /etc/passwd.bak /etc/passwd")

            # login with user shell that prints some stdout/err noise
            # having stdout output in ~/.bashrc confuses docker, so don't run on OSTree
            m.execute("cd ~admin; cp -a .bashrc .bashrc.bak; [ ! -e .profile ] || cp -a .profile .profile.bak; "
                      "echo 'echo noise-rc-out; echo noise-rc-err >&2' >> .bashrc; "
                      "echo 'echo noise-profile-out; echo noise-profile-err >&2' >> .profile")
            self.addCleanup(m.execute, "cd ~admin; mv .bashrc.bak .bashrc; "
                                       "if [ -e .profile.bak ]; then mv .profile.bak .profile; else rm .profile; fi")
            b.login_and_go()

        self.allow_journal_messages(r"pam_unix\(cockpit:auth\): authentication failure; .*",
                                    r"pam_unix\(cockpit:auth\): check pass; user unknown",
                                    r"pam_succeed_if\(cockpit:auth\): requirement .* not met by user .*",
                                    "noise-rc-.*")

    @testlib.skipWsContainer("logs in via ssh, not cockpit-session")
    def testLogging(self):
        m = self.machine
        b = self.browser

        def assert_messages(has_last, n_fail):
            if has_last:
                b.wait_in_text('#system_last_login', "Last successful login")
                b.wait_in_text('#system_last_login_from', "from")  # only present if IP was logged

            if n_fail:
                b.wait_in_text('#system_last_login', f'{n_fail} failed login')
                b.wait_in_text('#system_last_login_from', "from")
                b.wait_in_text('#system_last_login_success', "Last successful login")
            else:
                self.assertFalse(b.is_present('#system_last_login_success'))

        def verify_correct(has_last, n_fail):
            b.login_and_go('/system')
            assert_messages(has_last, n_fail)

            # reload and make sure it's still there (or not)
            b.reload()
            b.enter_page('/system')
            assert_messages(has_last, n_fail)

            b.logout()

        m.start_cockpit()

        # Clean out the relevant logfiles
        m.execute("truncate -s0 /var/log/{[bw]tmp,lastlog} /var/run/utmp")

        if m.image == "arch":
            self.sed_file("s/# deny = 3/deny = 4/", "/etc/security/faillock.conf")

        # First login should have no messages
        verify_correct(has_last=False, n_fail=0)

        # Next login should see the last login
        verify_correct(has_last=True, n_fail=0)

        # Do some bogus login attempts
        b.try_login('admin', 'xyz')
        b.wait_text("#login-error-title", "Authentication failed")
        b.try_login('admin', 'xyz')
        b.wait_text("#login-error-title", "Authentication failed")
        b.try_login('admin', 'xyz')
        b.wait_text("#login-error-title", "Authentication failed")

        # We should see those bogus attempts now
        verify_correct(has_last=False, n_fail=3)

        # But after that login, they should be gone again
        verify_correct(has_last=True, n_fail=0)

    @testlib.skipImage("Arch Linux has no pwquality by default", "arch")
    def testExpired(self):
        m = self.machine
        b = self.browser

        # with the ws container this happens over ssh
        if m.ws_container:
            self.restore_dir("/etc/ssh", restart_unit=self.sshd_service)
            m.execute("sed -i 's/.*ChallengeResponseAuthentication.*/ChallengeResponseAuthentication yes/' "
                      "/etc/ssh/sshd_config $(ls /etc/ssh/sshd_config.d/* 2>/dev/null || true)")
            m.execute(self.restart_sshd)

        # test steps below assume a pam_pwquality config with retry > 1; on some images authselect drops that setting
        if not m.image.startswith('debian') and not m.image.startswith('ubuntu') and not m.image.startswith("arch"):
            self.sed_file("/password.*requisite.*pam_pwquality/ s/$/ retry=3/", "/etc/pam.d/password-auth")

        m.execute("chage -d 0 admin")
        m.start_cockpit()
        b.open("/system")

        b.wait_visible("#login")
        b.wait_not_visible("#conversation-group")
        b.wait_visible("#password-group")
        b.wait_visible("#user-group")
        b.set_val('#login-user-input', "admin")
        b.set_val('#login-password-input', "foobar")
        b.click('#login-button')

        b.wait_visible("#conversation-group")
        b.wait_not_visible("#password-group")
        b.wait_not_visible("#user-group")
        if m.ws_container:
            b.wait_in_text("#conversation-prompt", "You are required to change your password")
        else:
            b.wait_in_text("#conversation-message", "You are required to change your password")
        b.set_val('#conversation-input', 'foobar')
        b.click('#login-button')

        # Type a bad password
        b.wait_visible("#conversation-group")
        b.wait_not_visible("#password-group")
        b.wait_not_visible("#user-group")

        b.wait_in_text("#conversation-prompt", "New password")
        b.set_val('#conversation-input', 'admin')
        b.click('#login-button')

        # We should see a message
        if m.ws_container:
            b.wait_in_text("#conversation-prompt", "BAD PASSWORD")
        else:
            b.wait_in_text("#conversation-message", "BAD PASSWORD")

        # Now choose a better password
        b.wait_not_present("#login-button:disabled")
        b.wait_visible("#conversation-group")
        b.wait_not_visible("#password-group")
        b.wait_not_visible("#user-group")
        b.wait_in_text("#conversation-prompt", "New password")
        b.set_val('#conversation-input', '123foobar!@#')
        b.click('#login-button')

        # Retype the password wrong
        b.wait_visible("#conversation-group")
        b.wait_not_visible("#password-group")
        b.wait_not_visible("#user-group")
        b.wait_in_text("#conversation-prompt", "Retype")
        b.set_val('#conversation-input', '123foobar!')  # wrong
        b.click('#login-button')

        # We should see a message
        if m.ws_container:
            b.wait_in_text("#conversation-prompt", "passwords do not match")
        else:
            b.wait_in_text("#conversation-message", "passwords do not match")

        # Type the password again
        b.wait_visible("#conversation-group")
        b.wait_not_visible("#password-group")
        b.wait_not_visible("#user-group")

        b.wait_in_text("#conversation-prompt", "New password")
        b.set_val('#conversation-input', '123foobar!@#')
        b.click('#login-button')

        # Now type it right
        b.wait_visible("#conversation-group")
        b.wait_not_visible("#password-group")
        b.wait_not_visible("#user-group")
        b.wait_in_text("#conversation-prompt", "Retype")
        b.set_val('#conversation-input', '123foobar!@#')
        b.click('#login-button')

        self.check_shell()

        self.allow_journal_messages('.*You are required to change your password immediately.*',
                                    '.*user account or password has expired.*',
                                    '.*BAD PASSWORD.*',
                                    '.*Sorry, passwords do not match.')
        self.allow_restart_journal_messages()

    @testlib.skipWsContainer("mock-pam-conv from cockpit-tests not installed")
    def testConversation(self):
        m = self.machine
        b = self.browser
        conf = "/etc/pam.d/cockpit"
        if m.ostree_image:
            conf = "/etc/pam.d/sshd"
            self.restore_dir("/etc/ssh", restart_unit=self.sshd_service)
            m.execute("sed -i 's/.*ChallengeResponseAuthentication.*/ChallengeResponseAuthentication yes/' "
                      "/etc/ssh/sshd_config $(ls /etc/ssh/sshd_config.d/* 2>/dev/null || true)")
            m.execute(self.restart_sshd)

        # On Arch Linux the ordering matters due to an auth include for system-remote-login
        if self.machine.image == "arch":
            self.sed_file('1 a auth       required    mock-pam-conv-mod.so', conf)
        else:
            self.sed_file('5 a auth       required    mock-pam-conv-mod.so', conf)

        m.start_cockpit()
        b.open("/system")

        # Try to login as a non-existing user
        b.try_login("nonexisting", "blahblah")

        b.wait_visible("#conversation-group")
        b.wait_not_visible("#password-group")
        b.wait_not_visible("#user-group")
        b.wait_in_text("#conversation-prompt", "life the universe")
        b.set_val('#conversation-input', '43')
        b.click('#login-button')
        if m.ws_container:
            # ssh asks for password again, despite NumberOfPasswordPrompts=1
            b.wait_in_text("#conversation-prompt", "nonexisting@127.0.0.1's password")
            b.set_val('#conversation-input', 'blahblah')
            b.click('#login-button')
        b.wait_text_not("#login-error-title", "")

        b.try_login("admin", "foobar")
        b.wait_visible("#conversation-group")
        b.wait_not_visible("#password-group")
        b.wait_not_visible("#user-group")
        b.wait_in_text("#conversation-prompt", "life the universe")
        b.set_val('#conversation-input', '42')
        b.click('#login-button')

        self.check_shell()

        self.allow_restart_journal_messages()

    @testlib.skipImage("No tlog", "debian-*", "ubuntu-*", "arch")
    @testlib.skipOstree("No tlog")
    @testlib.skipImage("FIXME: tlog messes up bridge frame protocol", "rhel-8-10")
    def testSessionRecordingShell(self):
        m = self.machine
        b = self.browser

        m.execute("useradd user --shell /usr/bin/tlog-rec-session")
        m.execute("echo user:abcdefg | chpasswd")
        # this doesn't actually record anything, but logging into cockpit should work
        m.start_cockpit()
        b.login_and_go("/system", user="user", password="abcdefg")
        b.wait_visible(".pf-v5-c-alert:contains('Web console is running in limited access mode.')")
        b.logout()

        self.allow_journal_messages(".*value for the SHELL variable was not found the /etc/shells.*",
                                    "Locale charset is ANSI.*",
                                    "Assuming locale environment is lost.*",
                                    "ATTENTION! Your session is being recorded!")

    def curl_auth(self, url, userpass):
        header = "Authorization: Basic " + base64.b64encode(userpass.encode()).decode()
        return subprocess.check_output(['/usr/bin/curl', '-s', '-k', '-D', '-', '--header', header,
                                        f'http://{self.machine.web_address}:{self.machine.web_port}{url}'],
                                       universal_newlines=True)

    def curl_auth_code(self, url, userpass):
        lines = self.curl_auth(url, userpass).splitlines()
        self.assertGreater(len(lines), 0)
        tokens = lines[0].split(' ', 2)
        self.assertEqual(len(tokens), 3)
        return int(tokens[1])

    def testRaw(self):
        self.machine.start_cockpit()
        time.sleep(0.5)
        self.assertEqual(self.curl_auth_code('/cockpit/login', ''), 401)
        self.assertEqual(self.curl_auth_code('/cockpit/login', 'foo:'), 401)
        self.assertEqual(self.curl_auth_code('/cockpit/login', 'foo:bar\n'), 401)
        self.assertEqual(self.curl_auth_code('/cockpit/login', 'foo:bar:baz'), 401)
        self.assertEqual(self.curl_auth_code('/cockpit/login', ':\n\n'), 401)
        self.assertIn(self.curl_auth_code('/cockpit/login', 'admin:bar'), [401, 403])
        self.assertEqual(self.curl_auth_code('/cockpit/login', 'foo:bar'), 401)
        self.assertIn(self.curl_auth_code('/cockpit/login', 'admin:' + 'x' * 4000), [401, 403])
        self.assertEqual(self.curl_auth_code('/cockpit/login', 'x' * 4000 + ':bar'), 401)
        self.assertEqual(self.curl_auth_code('/cockpit/login', 'a' * 4000 + ':'), 401)
        self.assertEqual(self.curl_auth_code('/cockpit/login', 'a' * 4000 + ':b\nc'), 401)
        self.assertEqual(self.curl_auth_code('/cockpit/login', 'a' * 4000 + ':b\nc\n'), 401)

        self.allow_journal_messages("Returning error-response ... with reason .*",
                                    r"pam_unix\(cockpit:auth\): authentication failure; .*",
                                    r"pam_unix\(cockpit:auth\): check pass; user unknown",
                                    r"pam_succeed_if\(cockpit:auth\): requirement .* not met by user .*",
                                    "couldn't parse login input: Malformed input",
                                    "couldn't parse login input: Authentication failed")

    @testlib.skipImage("No SELinux", "debian-*", "ubuntu-*", "arch")
    @testlib.skipOstree("No semanage")
    def testSELinuxRestrictedUser(self):
        m = self.machine
        b = self.browser

        # non-admin user_u
        m.execute("useradd unpriv; echo 'unpriv:foobar' | chpasswd; semanage login -a -s user_u unpriv")
        self.addCleanup(m.execute, "semanage login -d -s user_u unpriv")
        m.start_cockpit()
        b.login_and_go("/system", user="unpriv")
        # generate lastlog entry
        b.logout()
        b.login_and_go("/system", user="unpriv")
        # not an admin
        b.wait_visible(".pf-v5-c-alert:contains('Web console is running in limited access mode.')")

        # ws container does not keep login history
        if not m.ws_container:
            b.wait_in_text('#system_last_login', "Last successful login")
            b.wait_in_text('#system_last_login_from', "web console")

        b.logout()
        # not allowed to restricted users
        self.allow_journal_messages('.*type=1400.*avc:  denied  { map }.*comm="cockpit-pcp".*')
        self.allow_journal_messages('.*type=1400.*avc:  denied .* comm="sudo".*')
        self.allow_journal_messages('.*type=1400.*avc:  denied .* comm="systemd".*')
        self.allow_journal_messages('.*type=1400.*avc:  denied  { watch } .* comm="cockpit-bridge".*')
        self.allow_journal_messages('.*sudo:.* setresuid(.*): Operation not permitted')
        self.allow_journal_messages('.*sudo: error initializing audit plugin sudoers_audit')
        # https://bugzilla.redhat.com/show_bug.cgi?id=1727887
        self.allow_journal_messages('.*type=1400.*avc:  denied  { connectto } .* path="/run/user/.*/bus" scontext=user_u:user_r:user_t:s0.*')

        # sysadm_u
        # not allowed by default in RHEL 8
        if m.image.startswith("rhel-8"):
            m.execute("setsebool -P ssh_sysadm_login 1")
            self.addCleanup(m.execute, "setsebool -P ssh_sysadm_login 0")
        m.execute("useradd priv; echo 'priv:foobar' | chpasswd")
        m.execute("usermod -aG wheel priv")
        m.execute("semanage login -a -s sysadm_u priv")
        self.addCleanup(m.execute, "semanage login -d -s sysadm_u priv")
        b.login_and_go("/system", user="priv")
        # passing login info memfd should work
        b.logout()
        b.login_and_go("/system", user="priv")
        # ws container does not keep login history
        if not m.ws_container:
            b.wait_in_text('#system_last_login', "Last successful login")
            b.wait_in_text('#system_last_login_from', "web console")

        b.go("/playground/test")
        b.enter_page("/playground/test")
        b.click(".super-channel button")
        b.wait_in_text(".super-channel span", 'result: ')

        if m.image.startswith("rhel-8"):
            # https://bugzilla.redhat.com/show_bug.cgi?id=1814569
            self.assertIn('result: access-denied', b.text(".super-channel span"))
        else:
            self.assertIn('result: uid=0', b.text(".super-channel span"))

    def testNoAdminGroup(self):
        m = self.machine
        b = self.browser

        # Create a user that is not in wheel but can sudo
        m.execute("useradd user -s /bin/bash -c User")
        m.execute("echo user:foobar | chpasswd")
        self.write_file("/etc/sudoers.d/user", "user ALL=(ALL) ALL")
        self.password = "foobar"

        self.login_and_go("/system", user="user")

        # session is privileged
        b.switch_to_top()
        b.check_superuser_indicator("Administrative access")
        b.enter_page('/system')

        # shutdown button should be enabled and working
        b.click("#reboot-button")
        b.wait_popup("shutdown-dialog")
        b.wait_in_text(f"#shutdown-dialog button{self.danger_btn_class}", 'Reboot')
        b.click("#delay")
        b.click("button:contains('5 minutes')")
        b.wait_text("#delay .pf-v5-c-select__toggle-text", "5 minutes")
        b.click(f"#shutdown-dialog button{self.danger_btn_class}")

        # cancel reboot
        b.wait_in_text('#system-health-shutdown-status-text', "Scheduled reboot")
        b.click("#system-health-shutdown-status-cancel-btn")
        b.wait_not_present('#system-health-shutdown-status')

    def testUnsupportedBrowser(self):
        m = self.machine
        b = self.browser

        m.start_cockpit()
        # pretend browser doesn't support a required capability
        b.bidi("script.addPreloadScript", functionDeclaration="""() => {
               window.WebSocket = undefined; }""")
        b.open("/system")
        b.wait_visible("#unsupported-browser")
        b.wait_not_visible("#login-fatal")
        b.wait_not_visible("#login")
        b.wait_not_visible("#login-details")

    def testBypassSoftRequirements(self):
        m = self.machine
        b = self.browser

        m.start_cockpit()
        b.bidi("script.addPreloadScript", functionDeclaration="""() => {
               window.CSS.supports = () => false; }""")
        b.open("/system")
        b.wait_visible("#unsupported-browser")
        b.wait_in_text("#bypass-browser-check", "Bypass browser check")
        b.click(".pf-v5-c-expandable-section")

        b.set_val('#login-user-input', "admin")
        b.set_val('#login-password-input', "foobar")
        b.click("#login-button")

        self.check_shell()

    @testlib.skipWsContainer("no local config with cockpit/ws container")
    def testFailingWebsocket(self, safari=False, cacert=False):
        m = self.machine
        b = self.browser

        # Cause cockpit-ws to reject WebSocket connections
        m.write("/etc/cockpit/cockpit.conf", "[WebService]\nOrigins=foo.bar.com\n")
        self.allow_journal_messages("received request from bad Origin: .*",
                                    "connection unexpectedly closed by peer",
                                    "Received invalid handshake request from the client")
        self.allow_browser_errors(".*")

        m.start_cockpit()
        # Really start Cockpit to make sure it has generated all its certificates.
        m.execute("systemctl start cockpit")

        if safari:
            b.set_user_agent("Safari/300")

        if cacert:
            m.write("/etc/cockpit/ws-certs.d/0-self-signed-ca.pem", "FAKE CERT FOR TESTING\n")
        else:
            m.execute("rm -f /etc/cockpit/ws-certs.d/0-self-signed-ca.pem")

        # Log in.
        b.open("/system")
        b.wait_visible("#login")
        b.set_val('#login-user-input', "admin")
        b.set_val('#login-password-input', "foobar")
        b.click('#login-button')

        b.wait_visible("#early-failure")
        if safari and cacert:
            b.wait_visible("#safari-cert-help")
        else:
            b.wait_not_present("#safari-cert-help")

    @testlib.skipBrowser("Enough when only chromium pretends to be a different browser", "firefox")
    @testlib.skipWsContainer("no local config with cockpit/ws container")
    def testFailingWebsocketSafari(self):
        self.testFailingWebsocket(safari=True, cacert=True)

    @testlib.skipBrowser("Enough when only chromium pretends to be a different browser", "firefox")
    @testlib.skipWsContainer("no local config with cockpit/ws container")
    def testFailingWebsocketSafariNoCA(self):
        self.testFailingWebsocket(safari=True, cacert=False)

    def testFaillock(self):
        m = self.machine
        b = self.browser

        module = 'faillock'

        def logfile(user):
            return '/var/run/faillock/' + user

        def expect_fail_count(expected, user="admin", must_exist=True):
            if must_exist:
                cksum = m.execute(f"cksum {logfile(user)}")

            output = m.execute(f"{module} --user {user}")

            n_lines = len(output.splitlines())
            if n_lines:
                n_lines -= 2  # remove the header, if it's there

            self.assertEqual(expected, n_lines)

            # make sure the above command didn't change the file
            if must_exist:
                self.assertEqual(cksum, m.execute(f"cksum {logfile(user)}"))

        def expect_failed_login(user, password, n_failed):
            b.try_login(user, password)
            b.wait_text_not("#login-error-title", "")
            # this depends on the use case and OS implementation -- should just be something sensible
            self.assertIn(b.text("#login-error-title"),
                          ["Permission denied", "Authentication failed", "Wrong user name or password"])
            expect_fail_count(n_failed, user=user)

        def expect_successful_login(user, password):
            b.login_and_go('/system', user=user, password=password)
            expect_fail_count(0, user=user)
            b.logout()

        self.enable_root_login()

        # ensure we have no module in our pam config already
        # arch has faillock enabled by default
        if m.image != "arch":
            m.execute(f"! grep -r '{module}' /etc/pam.d")

        # add it to the pam config
        if m.image.startswith('debian-') or m.image.startswith('ubuntu'):
            # enable pam_faillock.  see example in pam_faillock(8)
            self.sed_file(r"/fallback if no module succeeds/ s/^/"
                          r"auth [default=die] pam_faillock.so authfail deny=4\n"
                          r"auth sufficient pam_faillock.so authsucc deny=4\n/",
                          "/etc/pam.d/common-auth")
        elif m.image == "arch":
            self.sed_file("s/# deny = 3/deny = 4/", "/etc/security/faillock.conf")
        else:
            # see https://access.redhat.com/solutions/62949
            self.sed_file("""/pam_unix/ {
                      s/sufficient/[success=1 default=ignore]/\n
                      aauth [default=die] pam_faillock.so authfail audit deny=4 unlock_time=600\n
                      aauth sufficient pam_faillock.so authsucc audit deny=4 unlock_time=600\n
                      }""", "/etc/pam.d/password-auth")

        m.execute(f"grep -r '{module}' /etc/pam.d")

        m.start_cockpit()

        # start from a clean slate
        clean_cmd = f"rm -f {logfile('admin')}"
        self.addCleanup(m.execute, clean_cmd)
        m.execute(clean_cmd)

        # make sure we can still login
        b.login_and_go("/system")
        b.logout()

        # and it should show zero fails
        expect_fail_count(0, must_exist=False)

        # try three bogus login attempts
        for n in [1, 2, 3]:
            expect_failed_login("admin", "bad", n)
        expect_fail_count(3)

        # make sure we can still login
        expect_successful_login("admin", "foobar")

        # after the success, try three more bogus login attempts
        for n in [1, 2, 3]:
            expect_failed_login("admin", "bad", n)
        expect_successful_login("admin", "foobar")

        # now try four, which should lock the account
        for n in [1, 2, 3, 4]:
            expect_failed_login("admin", "bad", n)

        # logging in should fail now
        expect_failed_login("admin", "foobar", n)

        # having given the correct password the last time should not have helped things
        expect_failed_login("admin", "foobar", n)

        # but we can reset the lockout
        m.execute(module + " --reset --user admin")
        expect_fail_count(0)

        # and login again
        expect_successful_login("admin", "foobar")

        # ws container logs in via sshd, which forbids root logging in with a password
        if not m.ws_container:
            # make sure root never gets locked out
            for n in range(1, 10):
                expect_failed_login("root", "bad", n)
            expect_successful_login("root", "foobar")

        self.allow_journal_messages(".*[aA]ccount .*locked due to .* failed logins.*")
        self.allow_journal_messages(".*minutes left to unlock.*")

    @testlib.skipWsContainer("root logins disabled by default with ssh")
    def testPamAccess(self):
        b = self.browser
        m = self.machine

        m.start_cockpit()
        b.open("/system")

        # root login is disabled by default via /etc/cockpit/disallowed-users on everything except RHEL 8
        if not m.image.startswith("rhel-8"):
            b.try_login("root", "foobar")
            b.wait_in_text("#login-error-title", "Permission denied")
            b.wait_not_visible("#login-error-message")

        # disable root login with pam_access
        self.enable_root_login()
        self.write_file("/etc/security/access.conf", "- : root : ALL\n", append=True)
        self.sed_file("1 aaccount required pam_access.so", "/etc/pam.d/cockpit")
        b.try_login(user="root")
        b.wait_in_text("#login-error-title", "Permission denied")
        b.wait_not_visible("#login-error-message")

        self.allow_journal_messages(r"cockpit-session: user account access failed.*root.*")

    @testlib.skipWsContainer("sssd/cockpit-session not available with ws container")
    @testlib.skipImage("sssd not currently in testing", "debian-testing")
    def testClientCertAuthentication(self):
        m = self.machine

        if m.image.startswith("debian") or m.image.startswith("ubuntu"):
            # on Debian/Ubuntu, an unconfigured sssd fails to start, so only restart it at the end if it was running before
            # also, sssd is split into several services
            self.restore_dir("/etc/sssd", post_restore_action="systemctl stop 'sssd*'")
        else:
            self.restore_dir("/etc/sssd", restart_unit="sssd")

        m.execute("useradd alice; echo alice:foobar123 | chpasswd")
        m.upload(["alice.pem", "alice.key", "alice-expired.pem", "bob.pem", "bob.key"], self.vm_tmpdir,
                 relative_dir="src/tls/ca/")
        alice_cert = self.vm_tmpdir + "/alice.pem"
        alice_cert_expired = self.vm_tmpdir + "/alice-expired.pem"
        alice_key = self.vm_tmpdir + "/alice.key"
        alice_cert_key = ['--cert', alice_cert, '--key', alice_key]

        # set up local (NIS) sssd provider and certificate mapping
        # newer sssd drops "file" provider, but the "proxy" provider cannot do this yet in old versions
        if m.image in ["debian-stable", "ubuntu-2204"]:
            id_provider = "id_provider = files"
        else:
            id_provider = """id_provider = proxy
                             local_auth_policy = only
                             proxy_lib_name = files"""

        self.write_file("/etc/sssd/sssd.conf", f"""
[sssd]
domains = local

[domain/local]
{id_provider}

[certmap/local/alice]
# Requires sssd >= 2.6.1 and installing sssd_auth_ca_db.pem; with earlier sssd this is completely unsafe
matchrule = <SUBJECT>^DC=LAN,DC=COCKPIT,CN=alice$
""", perm="0600")
        m.execute("systemctl restart sssd")

        # ensure sssd certificate lookup without validation works
        user_obj = m.execute('busctl call org.freedesktop.sssd.infopipe /org/freedesktop/sssd/infopipe/Users '
                             'org.freedesktop.sssd.infopipe.Users FindByCertificate s -- '
                             """"$(cat %s)" | sed 's/^o "//; s/"$//' """ % alice_cert)
        self.assertEqual(m.execute('busctl get-property org.freedesktop.sssd.infopipe ' + user_obj.strip() +
                                   ' org.freedesktop.sssd.infopipe.Users.User name').strip(),
                         's "alice"')

        err = m.execute('! busctl call org.freedesktop.sssd.infopipe /org/freedesktop/sssd/infopipe/Users '
                        'org.freedesktop.sssd.infopipe.Users FindByValidCertificate s -- '
                        """"$(cat %s)" 2>&1""" % alice_cert)
        self.assertIn("Certificate authority file not found", err)

        # install our CA, so that sssd can validate
        with open("src/tls/ca/ca.pem") as f:
            m.write("/etc/sssd/pki/sssd_auth_ca_db.pem", f.read())
        u = m.execute('busctl call org.freedesktop.sssd.infopipe /org/freedesktop/sssd/infopipe/Users '
                      'org.freedesktop.sssd.infopipe.Users FindByCertificate s -- '
                      """"$(cat %s)" | sed 's/^o "//; s/"$//' """ % alice_cert)
        self.assertEqual(u, user_obj)

        # These tests have to be run with curl, as chromium-headless does not support selecting/handling client-side
        # certificates; it just rejects cert requests. For interactive tests, grab src/tls/ca/alice.p12 and import
        # it into the browser.

        def do_test(authopts, expected, not_expected=None, session_leader=None):
            m.start_cockpit(tls=True)
            output = m.execute(['curl', '-ksS', '-D-', *authopts, 'https://localhost:9090/cockpit/login'])
            for s in expected:
                self.assertIn(s, output)
            for s in (not_expected or []):
                self.assertNotIn(s, output)
            # sessions/users often hang around in State=closing for a long time, ignore these
            if session_leader:
                m.execute('until [ "$(loginctl show-user --property=State --value alice)" = "active" ]; do sleep 1; done')
                sessions = m.execute('loginctl show-user --property=Sessions --value alice').strip().split()
                self.assertGreaterEqual(len(sessions), 1)
                for session in sessions:
                    out = m.execute('loginctl session-status ' + session)
                    # Skip manager session
                    if "Class: manager" in out:
                        continue
                    if "State: active" in out:  # skip closing sessions
                        self.assertIn(session_leader, out)
                        self.assertIn('cockpit-bridge', out)
                        # systemd < 255: "Service: cockpit; type web; class user"
                        # systemd ≥ 255: "Service: cockpit\n   Type: web\n Class: user"
                        self.assertRegex(out, r"Service:\s+cockpit")
                        self.assertRegex(out, "[tT]ype.*web")
                        break
                else:
                    self.fail("no active session for active user")

                # sessions time out after 10s, but let's not wait for that
                m.execute('loginctl terminate-session ' + sessions[0])
                # wait until the session is gone
                m.execute("while loginctl show-user alice | grep -q 'State=active'; do sleep 1; done")

            m.stop_cockpit()

        # from sssd
        self.allow_journal_messages("alice is not allowed to run sudo.*")

        # cert auth should not be enabled by default
        do_test(alice_cert_key, ["HTTP/1.1 401 Authentication failed"])
        # password auth should work
        do_test(['-u', 'alice:foobar123'],
                ['HTTP/1.1 200 OK', '"csrf-token"'],
                session_leader='cockpit-session')

        # enable cert based auth
        m.write("/etc/cockpit/cockpit.conf", '[WebService]\nClientCertAuthentication = true\n', append=True)
        # cert auth should work now
        do_test(alice_cert_key, ['HTTP/1.1 200 OK', '"csrf-token"'])
        # password auth, too
        do_test(['-u', 'alice:foobar123'],
                ['HTTP/1.1 200 OK', '"csrf-token"'],
                session_leader='cockpit-session')

        # another certificate gets rejected
        self.allow_journal_messages("cockpit-session: No matching user for certificate")
        do_test(["--cert", self.vm_tmpdir + "/bob.pem", "--key", self.vm_tmpdir + "/bob.key"],
                ["HTTP/1.1 401 Authentication failed"],
                not_expected=["crsf-token"])

        # check expired certificate
        m.start_cockpit(tls=True)
        journal_cursor = self.machine.journal_cursor()
        m.execute(f'! curl -ksS --cert {alice_cert_expired} --key {alice_key} https://localhost:9090/cockpit/login')
        m.stop_cockpit()
        testlib.wait(lambda: re.search(r'.*Invalid TLS peer certificate.* expired',
                     m.execute(f"journalctl -ocat --cursor '{journal_cursor}' SYSLOG_IDENTIFIER=cockpit-tls")),
                     tries=10)
        self.allow_journal_messages('.*Invalid TLS peer certificate.* expired')
        self.allow_journal_messages('.*TLS handshake failed: Error in the certificate verification.*')

        # disallow password auth
        m.write("/etc/cockpit/cockpit.conf", "[Basic]\naction = none\n", append=True)
        do_test(alice_cert_key, ['HTTP/1.1 200 OK', '"csrf-token"'])
        do_test(['-u', 'alice:foobar123'],
                ['HTTP/1.1 401 Authentication disabled'],
                not_expected=["crsf-token"])

        # wwithout a CA, alice's cert fails
        m.execute("rm /etc/sssd/pki/sssd_auth_ca_db.pem")
        self.allow_journal_messages("cockpit-session: Failed to map .* Invalid certificate provided")
        self.allow_journal_messages("cockpit-session: Failed to map .* Certificate authority file not found")
        do_test(alice_cert_key, ['HTTP/1.1 401 Authentication failed'])

        # sssd-dbus not available
        self.allow_journal_messages("cockpit-session: Failed to map .* Could not activate remote peer.*")
        self.allow_journal_messages("cockpit-session: Failed to map .* Unit sssd-ifp.service is masked.")
        m.execute("systemctl mask sssd-ifp; systemctl stop sssd-ifp")
        do_test(alice_cert_key, ["HTTP/1.1 401 Authentication failed"],
                not_expected=["crsf-token"])
        m.execute("systemctl unmask sssd-ifp")

    def testServer(self):
        m = self.machine
        b = self.browser

        m.execute("useradd --create-home user")
        m.execute("echo user:foobar | chpasswd")
        m.start_cockpit()
        # the quick succession of login/logout is too much for packagekit's brain
        self.disable_preload("packagekit", "playground", "systemd")

        def check_server(server, expect_fp_ack):
            b.open('/')
            b.wait_visible("#login")
            # start with no known keys every time
            b.eval_js("""window.localStorage.setItem("known_hosts", "{ }")""")
            b.set_val('#login-user-input', "user")
            b.set_val('#login-password-input', "foobar")
            b.click("#show-other-login-options")
            b.set_val("#server-field", server)
            b.click("#login-button")

            if expect_fp_ack:
                b.wait_in_text("#hostkey-title", "New host")
                b.wait_in_text("#hostkey-message-1", "for the first time")
                b.click("#login-button")

            b.wait_visible('#content')
            b.enter_page('/system')
            b.wait_visible('.system-information')
            b.logout()

        # by name
        check_server("localhost", expect_fp_ack=True)
        # by name and port
        check_server("localhost:22", expect_fp_ack=True)
        # by IPv4 address; 127.0.0.1 is treated specially as ignore_hostkey fallback, and does not require FP
        check_server("127.0.0.1", expect_fp_ack=False)
        # by IPv4 address and port
        check_server("127.0.0.1:22", expect_fp_ack=False)
        # by IPv6 address
        check_server("::1", expect_fp_ack=True)
        # by IPv6 address and port
        check_server("[::1]:22", expect_fp_ack=True)

    # enable this once our cockpit/ws container can beiboot
    @testlib.skipWsContainer("client setup does not work with ws container")
    def testSSH(self):
        m = self.machine
        b = self.browser

        # this matches our bots test VMs
        my_ip = "172.27.0.15"

        m.start_cockpit()

        def try_login(user, password, server=None, expect_hostkey=False):
            b.open("/")
            b.set_val('#login-user-input', user)
            b.set_val('#login-password-input', password)
            b.click("#show-other-login-options")
            b.set_val("#server-field", server or my_ip)
            b.click("#login-button")
            if expect_hostkey:
                b.wait_in_text("#hostkey-message-1", server or my_ip)
                b.wait_in_text("#hostkey-fingerprint", "SHA256:")
                b.click("#login-button")

        def check_no_processes():
            m.execute(f"while pgrep -af '[s]sh .* {my_ip}' >&2; do sleep 1; done")
            m.execute("while pgrep -af '[c]ockpit.beiboot' >&2; do sleep 1; done")

        def check_session(server=None):
            b.wait_visible('#content')
            b.enter_page('/system')
            b.wait_visible('.system-information')
            m.execute(f"pgrep -af '[s]sh .* -l admin .*{server or my_ip}'")
            m.execute(f"pgrep -af '[c]ockpit.beiboot.*{server or my_ip}'")
            b.logout()
            check_no_processes()

        def check_store_hostkey(expected: str) -> None:
            db_hostkey = b.eval_js(f'JSON.parse(window.localStorage.getItem("known_hosts"))["{my_ip}"]')
            if m.image.startswith('debian') or m.image.startswith('ubuntu'):
                # these OSes encrypt host keys, so don't compare them
                db_hostkey = ' '.join(db_hostkey.split()[1:])
                expected = ' '.join(expected.split()[1:])
            self.assertEqual(db_hostkey, expected)

        # successful login through SSH
        try_login("admin", "foobar", expect_hostkey=True)
        check_session()
        # wrote full host key into session storage
        # some OpenSSH versions add a comment here, filter that out
        real_hostkey = m.execute(f"ssh-keyscan -t ssh-ed25519 {my_ip} | grep -v ^#")
        check_store_hostkey(real_hostkey)

        # host key is now known, but wrong password
        try_login("admin", "wrong")
        b.wait_text("#login-error-title", "Authentication failed")
        b.wait_text("#login-error-message", "Wrong user name or password")
        check_no_processes()
        # goes back to normal login form
        b.wait_visible('#login-user-input')

        # colliding usernames; user names in "Connect to:" are *not* supported,
        # but pin down the behaviour
        try_login("admin", "foobar", server=f"other@{my_ip}")
        check_session()

        # IPv6
        try_login("admin", "foobar", server="::1", expect_hostkey=True)
        check_session(server="::1")
        try_login("admin", "foobar", server="[::1]:22")
        check_session(server="::1")

        # empty password
        self.write_file("/etc/ssh/sshd_config.d/01-empty-password.conf", "PermitEmptyPasswords yes",
                        post_restore_action=self.restart_sshd)
        m.execute(self.restart_sshd)
        m.execute("passwd -d admin")
        try_login("admin", "")
        check_session()
        m.execute("echo 'admin:foobar' | chpasswd")
        if m.image == 'arch':
            # HACK: PermitEmptyPasswords somehow breaks regular password auth
            m.execute("rm /etc/ssh/sshd_config.d/01-empty-password.conf")
            m.execute(self.restart_sshd)

        # non-matching host key
        bad_key = "172.27.0.15 ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAINKEIUL5s7ebg5Y6JdYNq4+mAaaaaaP1VRBDUiVdHT3R"
        b.eval_js(f"""window.localStorage.setItem('known_hosts', '{{"{my_ip}": "{bad_key}"}}')""")

        # first try with wrong credentials
        try_login("admin", "wrong")
        b.wait_text("#hostkey-title", f"{my_ip} key changed")
        b.wait_in_text("#hostkey-fingerprint", "SHA256:")
        b.click("#login-button.pf-m-danger")
        b.wait_text("#login-error-title", "Authentication failed")
        # no details, it's a PAM conversation
        b.wait_not_visible("#login-error-message")
        check_no_processes()
        # new host key is accepted and updated in db
        # confirmation replaces (not amends) known key
        check_store_hostkey(real_hostkey)

        # now with correct credentials, does not ask for host key any more
        try_login("admin", "foobar")
        check_session()
        check_store_hostkey(real_hostkey)

        # good credentials with bad key
        b.eval_js(f"""window.localStorage.setItem('known_hosts', '{{"{my_ip}": "{bad_key}"}}')""")
        try_login("admin", "foobar")
        b.wait_text("#hostkey-title", f"{my_ip} key changed")
        b.wait_in_text("#hostkey-fingerprint", "SHA256:")
        b.click("#login-button.pf-m-danger")
        check_session()
        check_store_hostkey(real_hostkey)


if __name__ == '__main__':
    testlib.test_main()
